/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.db.engine.snapshot;

import org.apache.commons.io.FileUtils;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.FolderManager;
import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;

public class SnapshotLoader {
    private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class);
    private String storageGroupName;
    private String snapshotPath;
    private String dataRegionId;
    private SnapshotLogAnalyzer logAnalyzer;

    public SnapshotLoader(String snapshotPath, String storageGroupName, String dataRegionId) {
        this.snapshotPath = snapshotPath;
        this.storageGroupName = storageGroupName;
        this.dataRegionId = dataRegionId;
    }

    private DataRegion loadSnapshot() {
        try {
            return new DataRegion(
                    IoTDBDescriptor.getInstance().getConfig().getSystemDir()
                            + File.separator
                            + "databases"
                            + File.separator
                            + storageGroupName,
                    dataRegionId,
                    StorageEngine.getInstance().getFileFlushPolicy(),
                    storageGroupName);
        } catch (Exception e) {
            LOGGER.error("Exception occurs while load snapshot from {}", snapshotPath, e);
            return null;
        }
    }

    private File getSnapshotLogFile() {
        File sourceDataDir = new File(snapshotPath);

        if (sourceDataDir.exists()) {
            File[] files =
                    sourceDataDir.listFiles((dir, name) -> name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME));
            if (files == null || files.length == 0) {
                LOGGER.warn("Failed to find snapshot log file, cannot recover it");
            } else if (files.length > 1) {
                LOGGER.warn(
                        "Found more than one snapshot log file, cannot recover it. {}", Arrays.toString(files));
            } else {
                LOGGER.info("Reading snapshot log file {}", files[0]);
                return files[0];
            }
        }
        return null;
    }

    /**
     * 1. Clear origin data 2. Move snapshot data to data dir 3. Load data region
     *
     * @return
     */
    public DataRegion loadSnapshotForStateMachine() {
        LOGGER.info(
                "Loading snapshot for {}-{}, source directory is {}",
                storageGroupName,
                dataRegionId,
                snapshotPath);

        File snapshotLogFile = getSnapshotLogFile();

        if (snapshotLogFile == null) {
            return loadSnapshotWithoutLog();
        } else {
            return loadSnapshotWithLog(snapshotLogFile);
        }
    }

    private DataRegion loadSnapshotWithoutLog() {
        try {
            try {
                deleteAllFilesInDataDirs();
                LOGGER.info("Remove all data files in original data dir");
            } catch (IOException e) {
                LOGGER.error("Failed to remove origin data files", e);
                return null;
            }
            LOGGER.info("Moving snapshot file to data dirs");
            try {
                deleteAllFilesInDataDirs();
                LOGGER.info("Remove all data files in original data dir");
            } catch (IOException e) {
                return null;
            }
            createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath));
            return loadSnapshot();
        } catch (IOException | DiskSpaceInsufficientException e) {
            LOGGER.error(
                    "Exception occurs when loading snapshot for {}-{}", storageGroupName, dataRegionId, e);
            return null;
        }
    }

    private DataRegion loadSnapshotWithLog(File logFile) {
        boolean snapshotComplete = false;
        try {
            logAnalyzer = new SnapshotLogAnalyzer(logFile);
            snapshotComplete = logAnalyzer.isSnapshotComplete();
        } catch (Exception e) {
            LOGGER.error("Exception occurs when reading snapshot file", e);
            return null;
        }

        if (!snapshotComplete) {
            // Do not load this snapshot because it's not complete.
            LOGGER.error("This snapshot is not complete, cannot load it");
            return null;
        }

        try {
            try {
                deleteAllFilesInDataDirs();
                LOGGER.info("Remove all data files in original data dir");
                createLinksFromSnapshotDirToDataDirWithLog();
                return loadSnapshot();
            } catch (IOException e) {
                LOGGER.error("Failed to remove origin data files", e);
                return null;
            }
        } finally {
            logAnalyzer.close();
        }
    }

    private void deleteAllFilesInDataDirs() throws IOException {
        String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getDataDirs();

        // delete
        List<File> timePartitions = new ArrayList<>();
        for (String dataDirPath : dataDirPaths) {
            File seqDataDirForThisRegion =
                    new File(
                            dataDirPath
                                    + File.separator
                                    + IoTDBConstant.SEQUENCE_FLODER_NAME
                                    + File.separator
                                    + storageGroupName
                                    + File.separator
                                    + dataRegionId);
            if (seqDataDirForThisRegion.exists()) {
                File[] files = seqDataDirForThisRegion.listFiles();
                if (files != null) {
                    timePartitions.addAll(Arrays.asList(files));
                }
            }

            File unseqDataDirForThisRegion =
                    new File(
                            dataDirPath
                                    + File.separator
                                    + IoTDBConstant.UNSEQUENCE_FLODER_NAME
                                    + File.separator
                                    + storageGroupName
                                    + File.separator
                                    + dataRegionId);

            if (unseqDataDirForThisRegion.exists()) {
                File[] files = unseqDataDirForThisRegion.listFiles();
                if (files != null) {
                    timePartitions.addAll(Arrays.asList(files));
                }
            }
        }

        try {
            for (File timePartition : timePartitions) {
                FileUtils.forceDelete(timePartition);
            }
        } catch (IOException e) {
            LOGGER.error(
                    "Exception occurs when deleting time partition directory for {}-{}",
                    storageGroupName,
                    dataRegionId,
                    e);
            throw e;
        }
    }

    private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir)
            throws IOException, DiskSpaceInsufficientException {
        File seqFileDir =
                new File(
                        sourceDir,
                        IoTDBConstant.SEQUENCE_FLODER_NAME
                                + File.separator
                                + storageGroupName
                                + File.separator
                                + dataRegionId);
        File unseqFileDir =
                new File(
                        sourceDir,
                        IoTDBConstant.UNSEQUENCE_FLODER_NAME
                                + File.separator
                                + storageGroupName
                                + File.separator
                                + dataRegionId);
        if (!seqFileDir.exists() && !unseqFileDir.exists()) {
            throw new IOException(
                    String.format(
                            "Cannot find %s or %s",
                            seqFileDir.getAbsolutePath(), unseqFileDir.getAbsolutePath()));
        }
        FolderManager folderManager =
                new FolderManager(
                        Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()),
                        DirectoryStrategyType.SEQUENCE_STRATEGY);
        File[] timePartitionFolders = seqFileDir.listFiles();
        if (timePartitionFolders != null) {
            for (File timePartitionFolder : timePartitionFolders) {
                File[] files = timePartitionFolder.listFiles();
                if (files == null || files.length == 0) {
                    continue;
                }
                String targetSuffix =
                        IoTDBConstant.SEQUENCE_FLODER_NAME
                                + File.separator
                                + storageGroupName
                                + File.separator
                                + dataRegionId
                                + File.separator
                                + timePartitionFolder.getName();
                createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager);
            }
        }

        timePartitionFolders = unseqFileDir.listFiles();
        if (timePartitionFolders != null) {
            for (File timePartitionFolder : timePartitionFolders) {
                File[] files = timePartitionFolder.listFiles();
                if (files == null || files.length == 0) {
                    continue;
                }
                String targetSuffix =
                        IoTDBConstant.UNSEQUENCE_FLODER_NAME
                                + File.separator
                                + storageGroupName
                                + File.separator
                                + dataRegionId
                                + File.separator
                                + timePartitionFolder.getName();
                createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager);
            }
        }
    }

    private void createLinksFromSnapshotToSourceDir(
            String targetSuffix, File[] files, FolderManager folderManager)
            throws DiskSpaceInsufficientException, IOException {
        for (File file : files) {
            String dataDir = folderManager.getNextFolder();
            File targetFile =
                    new File(dataDir + File.separator + targetSuffix + File.separator + file.getName());
            if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
                throw new IOException(
                        String.format(
                                "Cannot create directory %s", targetFile.getParentFile().getAbsolutePath()));
            }
            try {
                Files.createLink(targetFile.toPath(), file.toPath());
                continue;
            } catch (IOException e) {
                LOGGER.info("Cannot create link from {} to {}, try to copy it", file, targetFile);
            }

            Files.copy(file.toPath(), targetFile.toPath());
        }
    }

    private void createLinksFromSnapshotDirToDataDirWithLog() throws IOException {
        String snapshotId = logAnalyzer.getSnapshotId();
        int loggedFileNum = logAnalyzer.getTotalFileCountInSnapshot();
        Set<String> fileInfoSet = logAnalyzer.getFileInfoSet();
        String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
        int fileCnt = 0;
        for (String dataDir : dataDirs) {
            String snapshotDir =
                    dataDir
                            + File.separator
                            + IoTDBConstant.SNAPSHOT_FOLDER_NAME
                            + File.separator
                            + storageGroupName
                            + "-"
                            + dataRegionId
                            + File.separator
                            + snapshotId;
            fileCnt += takeHardLinksFromSnapshotToDataDir(dataDir, new File(snapshotDir), fileInfoSet);
        }
        if (fileCnt != loggedFileNum) {
            throw new IOException(
                    String.format(
                            "The file num in log is %d, while file num in disk is %d", loggedFileNum, fileCnt));
        }
    }

    private int takeHardLinksFromSnapshotToDataDir(
            String dataDir, File snapshotFolder, Set<String> fileInfoSet) throws IOException {
        int cnt = 0;
        File sequenceTimePartitionFolders =
                new File(
                        snapshotFolder.getAbsolutePath()
                                + File.separator
                                + IoTDBConstant.SEQUENCE_FLODER_NAME
                                + File.separator
                                + storageGroupName
                                + File.separator
                                + dataRegionId);
        File[] timePartitionFolders = sequenceTimePartitionFolders.listFiles();
        if (timePartitionFolders != null) {
            for (File timePartitionFolder : timePartitionFolders) {
                String timePartition = timePartitionFolder.getName();
                File[] sourceFiles = timePartitionFolder.listFiles();
                if (sourceFiles == null) {
                    continue;
                }
                File targetDir =
                        new File(
                                dataDir
                                        + File.separator
                                        + IoTDBConstant.SEQUENCE_FLODER_NAME
                                        + File.separator
                                        + storageGroupName
                                        + File.separator
                                        + dataRegionId
                                        + File.separator
                                        + timePartition);
                createLinksFromSourceToTarget(targetDir, sourceFiles, fileInfoSet);
                cnt += sourceFiles.length;
            }
        }

        File unsequenceTimePartitionFolders =
                new File(
                        snapshotFolder.getAbsolutePath()
                                + File.separator
                                + IoTDBConstant.UNSEQUENCE_FLODER_NAME
                                + File.separator
                                + storageGroupName
                                + File.separator
                                + dataRegionId);
        timePartitionFolders = unsequenceTimePartitionFolders.listFiles();
        if (timePartitionFolders != null) {
            for (File timePartitionFolder : timePartitionFolders) {
                String timePartition = timePartitionFolder.getName();
                File[] sourceFiles = timePartitionFolder.listFiles();
                if (sourceFiles == null) {
                    continue;
                }
                File targetDir =
                        new File(
                                dataDir
                                        + File.separator
                                        + IoTDBConstant.UNSEQUENCE_FLODER_NAME
                                        + File.separator
                                        + storageGroupName
                                        + File.separator
                                        + dataRegionId
                                        + File.separator
                                        + timePartition);
                createLinksFromSourceToTarget(targetDir, sourceFiles, fileInfoSet);
                cnt += sourceFiles.length;
            }
        }

        return cnt;
    }

    private void createLinksFromSourceToTarget(File targetDir, File[] files, Set<String> fileInfoSet)
            throws IOException {
        for (File file : files) {
            String infoStr = getFileInfoString(file);
            if (!fileInfoSet.contains(infoStr)) {
                throw new IOException(
                        String.format("File %s is not in the log file list", file.getAbsolutePath()));
            }
            File targetFile = new File(targetDir, file.getName());
            if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
                throw new IOException(
                        String.format(
                                "Cannot create directory %s", targetFile.getParentFile().getAbsolutePath()));
            }
            Files.createLink(targetFile.toPath(), file.toPath());
        }
    }

    private String getFileInfoString(File file) {
        String[] splittedStr = file.getAbsolutePath().split(File.separator.equals("\\") ? "\\\\" : "/");
        int length = splittedStr.length;
        return splittedStr[length - SnapshotLogger.FILE_NAME_OFFSET]
                + SnapshotLogger.SPLIT_CHAR
                + splittedStr[length - SnapshotLogger.TIME_PARTITION_OFFSET]
                + SnapshotLogger.SPLIT_CHAR
                + splittedStr[length - SnapshotLogger.SEQUENCE_OFFSET];
    }

    public List<File> getSnapshotFileInfo() throws IOException {
        File snapshotLogFile = getSnapshotLogFile();

        if (snapshotLogFile == null) {
            return searchDataFilesRecursively(snapshotPath);
        } else {
            return getSnapshotFileWithLog(snapshotLogFile);
        }
    }

    private List<File> getSnapshotFileWithLog(File logFile) throws IOException {
        SnapshotLogAnalyzer analyzer = new SnapshotLogAnalyzer(logFile);
        try {
            String snapshotId = analyzer.getSnapshotId();
            String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
            List<File> fileList = new LinkedList<>();
            for (String dataDir : dataDirs) {
                String snapshotDir =
                        dataDir
                                + File.separator
                                + IoTDBConstant.SNAPSHOT_FOLDER_NAME
                                + File.separator
                                + storageGroupName
                                + "-"
                                + dataRegionId
                                + File.separator
                                + snapshotId;
                fileList.addAll(searchDataFilesRecursively(snapshotDir));
            }
            return fileList;
        } finally {
            analyzer.close();
        }
    }

    /**
     * Search all data files in one directory recursively.
     *
     * @return
     */
    private List<File> searchDataFilesRecursively(String dir) throws IOException {
        LinkedList<File> fileList = new LinkedList<>();
        Files.walkFileTree(
                new File(dir).toPath(),
                new FileVisitor<Path>() {
                    @Override
                    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
                            throws IOException {
                        return FileVisitResult.CONTINUE;
                    }

                    @Override
                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
                            throws IOException {
                        if (SnapshotFileSet.isDataFile(file.toFile())) {
                            fileList.add(file.toFile());
                        }
                        return FileVisitResult.CONTINUE;
                    }

                    @Override
                    public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
                        return FileVisitResult.CONTINUE;
                    }

                    @Override
                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                        return FileVisitResult.CONTINUE;
                    }
                });
        return fileList;
    }
}
